package com.xiaojie.hadoop.output;

import com.xiaojie.hadoop.bean.CommodityBean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @author 熟透的蜗牛
 * @version 1.0
 * @description: 自定义OutPutFormat，这里的泛型为reduce输出的结果
 * @date 2025/1/7 22:39
 */
public class MysqlOutPutFormat extends OutputFormat<Text, CommodityBean> {


    @Override
    public RecordWriter<Text, CommodityBean> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new MysqlRecordWriter();
    }

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {

    }

    private FileOutputCommitter committer = null;

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        if (committer == null) {
            String name = context.getConfiguration().get(FileOutputFormat.OUTDIR);

            Path outputPath = name == null ? null : new Path(name);
            committer = new FileOutputCommitter(outputPath, context);
        }
        return committer;
    }
}

class MysqlRecordWriter extends RecordWriter<Text, CommodityBean> {

    private static final String DRIVER = "com.mysql.jdbc.Driver";
    private static final String URL = "jdbc:mysql://mysql-master:3306/order?useUnicode=true&characterEncoding=utf8";
    private static final String USER = "root";
    private static final String PASSWORD = "root";
    private Connection connection;

    public MysqlRecordWriter() {
        try {
            Class.forName(DRIVER);
            connection = DriverManager.getConnection(URL, USER, PASSWORD);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    public void write(Text key, CommodityBean bean) {

        String sql = "INSERT INTO hbase_commodity (id, commodity_name,picture,price,user_id,count) VALUES (?,?,?,?,?,?)ON DUPLICATE KEY UPDATE count=?;";
        PreparedStatement preparedStatement = null;

        try {
            preparedStatement = connection.prepareStatement(sql);
            preparedStatement.setLong(1, Long.parseLong(bean.getCommodityId()));
            preparedStatement.setString(2, bean.getCommodityName());
            preparedStatement.setString(3, bean.getPicture());
            preparedStatement.setString(4, bean.getPrice());
            preparedStatement.setLong(5, Long.parseLong(bean.getUserId()));
            preparedStatement.setLong(6, bean.getCount());
            preparedStatement.setLong(7, bean.getCount());
            //执行sql
            preparedStatement.execute();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != preparedStatement) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        if (null != connection) {
            try {
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

